package com.tpvlog.dfs.datanode.server;

import com.tpvlog.dfs.rpc.service.*;
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.NegotiationType;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;

/**
 * 负责跟一组NameNode中的某一个进行通信的线程组件
 *
 * @author Ressmix
 */
public class NameNodeConnActor {

    // 我这里直接写死NameNode的信息，读者可以自己完善，从配置文件读取
    private static final String NAMENODE_HOSTNAME = "localhost";
    private static final Integer NAMENODE_PORT = 50070;

    private NameNodeServiceGrpc.NameNodeServiceBlockingStub namenode;

    public NameNodeConnActor() {
        ManagedChannel channel = NettyChannelBuilder
                .forAddress(NAMENODE_HOSTNAME, NAMENODE_PORT)
                .negotiationType(NegotiationType.PLAINTEXT)
                .build();
        this.namenode = NameNodeServiceGrpc.newBlockingStub(channel);
    }

    public void startRegister() {
        Thread registerThread = new RegisterThread();
        registerThread.start();
        try {
            registerThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void startHeartbeat() {
        new HeartbeatThread().start();
    }

    /**
     * 负责注册的线程
     */
    class RegisterThread extends Thread {
        @Override
        public void run() {
            try {
                System.out.println("发送RPC请求到NameNode进行注册.......");

                // 当前DataNode节点的信息，我这里直接写死了
                // 大家可以自己完善，比如加载配置文件读取
                String ip = "127.0.0.1";
                String hostname = "dfs-datanode-01";

                // RPC调用，向NameNode发送注册请求
                RegisterRequest request = RegisterRequest.newBuilder()
                        .setIp(ip)
                        .setHostname(hostname)
                        .build();
                RegisterResponse response = namenode.register(request);
                System.out.println("接收到NameNode返回的注册响应：" + response.getStatus());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    /**
     * 负责心跳的线程
     */
    class HeartbeatThread extends Thread {
        @Override
        public void run() {
            try {
                while (true) {
                    System.out.println("发送RPC请求到NameNode进行心跳.......");

                    // 当前DataNode节点的信息，我这里直接写死了
                    // 大家可以自己完善，比如加载配置文件读取
                    String ip = "127.0.0.1";
                    String hostname = "dfs-datanode-01";

                    // 通过RPC接口发送到NameNode他的注册接口上去
                    HeartbeatRequest request = HeartbeatRequest.newBuilder()
                            .setIp(ip)
                            .setHostname(hostname)
                            .build();
                    HeartbeatResponse response = namenode.heartbeat(request);
                    System.out.println("接收到NameNode返回的心跳响应：" + response.getStatus());

                    // 每隔30秒发送一次心跳
                    Thread.sleep(30 * 1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
